package org.voovan.korla.socket;

import org.voovan.korla.KorlaStatic;
import org.voovan.korla.exception.KorlaException;
import org.voovan.korla.message.Callback;
import org.voovan.korla.message.Msg;
import org.voovan.network.ConnectModel;
import org.voovan.network.IoFilter;
import org.voovan.network.IoSession;
import org.voovan.network.exception.IoFilterException;
import org.voovan.network.exception.SendMessageException;
import org.voovan.tools.log.Logger;
import org.voovan.tools.serialize.TSerialize;

import static org.voovan.korla.KorlaStatic.*;
import static org.voovan.tools.TObject.nullDefault;

/**
 * Korla 过滤器
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class KorlaFilter implements IoFilter {
    public static int TYPE_ENCODE = 0;
    public static int TYPE_DECODE = 1;

    static {
        KorlaStatic.registerClass();
    }

    @Override
    public Object decode(IoSession ioSession, Object o) throws IoFilterException {
        if(o instanceof byte[]) {
            Msg msg = (Msg) TSerialize.unserialize((byte[]) o);

            //注册消息不处理
            if(!msg.isRegMsg()) {
                msg.setSource(Msg.MSG_SOURCE_RCV);

                //幂等响应拦截
                if (msg.isIdempotence() && ioSession.socketContext().getConnectModel() == ConnectModel.SERVER) {
                    Msg idempotenceMsg = MESSAGE_IDEMPOTENCE.get(msg.getId());
                    if (idempotenceMsg != null) {
                        //幂等存在直接响应消息, 不进入业务处理
                        try {
                            idempotenceMsg.setDone(true);
                            ioSession.syncSend(idempotenceMsg);
                        } catch (SendMessageException e) {
                            Logger.error("[Korla] KorlaFilter.decode has error", e);
                        }
                        return null;
                    }
                }

                fillCache(msg, TYPE_DECODE);
            }

            return msg;
        } else {
            throw new IoFilterException("SerializeFilter.decode data is not a object of byte[].class");
        }
    }

    @Override
    public Object encode(IoSession ioSession, Object o) throws IoFilterException {
        if(o == null) {
            return null;
        }

        if(o instanceof Msg) {
            Msg msg = (Msg) o;

            //注册消息不处理
            if(!msg.isRegMsg()) {
                msg.setSource(Msg.MSG_SOURCE_SND);

                fillMsg(ioSession, msg);

                //保存幂等信息, 只保留最后一次的
                if (msg.isIdempotence() && ioSession.socketContext().getConnectModel() == ConnectModel.SERVER) {
                    MESSAGE_IDEMPOTENCE.put(msg.getId(), msg);
                }

                fillCache(msg, TYPE_ENCODE);
            }

            return serializeForSocket(msg);
        } else {
            throw new IoFilterException("SerializeFilter.decode data is not a object of Msg.class");
        }
    }

    /**
     * 针对 socket 发送的序列化
     *      这里会将 callback 和 ack 暂时设置为 null, 序列化完成后再还原
     * @param msg 消息对象
     * @return 序列化的数据
     */
    public byte[] serializeForSocket(Msg msg) {
        Integer source = msg.getSource();
        Long timestamp = msg.getTimestamp();
        Callback callBack = msg.getCallBack();

        msg.setSource(null);
        msg.setTimestamp(null);
        msg.setCallBack(null);

        byte[] result = TSerialize.serialize(msg);

        msg.setSource(source);
        msg.setTimestamp(timestamp);
        msg.setCallBack(callBack);

        return result;
    }

    /**
     * 发送请求补充消息的必要信息
     *      1.通道名称
     *      2.时间戳
     * @param ioSession 会话对象
     * @param msg
     */
    public void fillMsg(IoSession ioSession, Msg msg) {
        Object sessionChannel = ioSession.getAttribute(CHANNEL_STR);
        if (msg.getChannel() == null) {
            msg.setChannel(nullDefault(sessionChannel, DEFAULT_CHANNEL).toString());
        } else if (sessionChannel!=null && !msg.getChannel().equals(sessionChannel)) {
            throw new KorlaException("Msg's channel is different with session's channel");
        }
    }

    /**
     * 消息对象的缓存处理
     * @param msg 消息对象
     * @param type 接收类型
     */
    public void fillCache(Msg msg, int type) {
        if(msg==null) {
            return;
        }

        msg.updateTimestamp();

        //==========================================纯内存处理==========================================
        Msg memCachedMsg = MESSAGE_CACHE.get(msg.getId());

        if(memCachedMsg!=null) {
            //原始消息向收到的消息传递回调, 实际业务处理的都是收到的消息
            if (type == TYPE_DECODE) {
                msg.setCallBack(memCachedMsg.getCallBack());
            }
        }

        //缓存数据
        if(msg.isNormalMsg() && !msg.isDone()) {
            //=============持久化============
            MESSAGE.put(msg.getId(), msg);
            //=============内存============
            MESSAGE_CACHE.put(msg.getId(), msg);
        }

        //清理结束消息, 场景是发送作为最后一条消息
        if (type == TYPE_ENCODE) {
            Msg.removeFromCache(msg);
        }
    }
}
